// Copyright Epic Games, Inc. All Rights Reserved.

#include <zencore/jobqueue.h>

#include <zencore/except.h>
#include <zencore/scopeguard.h>
#include <zencore/thread.h>
#include <zencore/workthreadpool.h>

#if ZEN_WITH_TESTS
#	include <zencore/testing.h>
#endif	// ZEN_WITH_TESTS

#include <deque>
#include <thread>
#include <unordered_map>

namespace zen {

namespace JobClock {
	using ClockSource = std::chrono::system_clock;

	using Tick = ClockSource::rep;

	Tick Never() { return ClockSource::time_point::min().time_since_epoch().count(); }
	Tick Always() { return ClockSource::time_point::max().time_since_epoch().count(); }
	Tick Now() { return ClockSource::now().time_since_epoch().count(); }

	ClockSource::time_point TimePointFromTick(const Tick TickCount) { return ClockSource::time_point{ClockSource::duration{TickCount}}; }
}  // namespace JobClock

class JobQueueImpl;

class JobQueueImpl : public JobQueue
{
public:
	struct Job : public RefCounted, public JobContext
	{
		JobQueueImpl*	 Queue;
		std::string		 Name;
		JobId			 Id;
		JobFunction		 Callback;
		std::atomic_bool CancelFlag;
		State			 State;
		JobClock::Tick	 CreateTick;
		JobClock::Tick	 StartTick;
		JobClock::Tick	 EndTick;
		int				 WorkerThreadId;

		virtual bool IsCancelled() const override { return CancelFlag.load(); }
		virtual void ReportMessage(std::string_view Message) override { Queue->ReportMessage(Id, Message); }
		virtual void ReportProgress(std::string_view CurrentOp, uint32_t CurrentOpPercentComplete) override
		{
			Queue->ReportProgress(Id, CurrentOp, CurrentOpPercentComplete);
		}
	};

	JobQueueImpl(int WorkerCount, std::string_view QueueName) : WorkerPool(WorkerCount, QueueName), WorkerCounter(1)
	{
		InitializedFlag.store(true);
	}

	virtual ~JobQueueImpl()
	{
		try
		{
			if (InitializedFlag)
			{
				Stop();
			}
		}
		catch (const std::exception& Ex)
		{
			ZEN_WARN("Failed shutting down jobqueue. Reason: '{}'", Ex.what());
		}
	}

	virtual JobId QueueJob(std::string_view Name, JobFunction&& JobFunc) override
	{
		ZEN_ASSERT(InitializedFlag);

		uint64_t NewJobId = IdGenerator.fetch_add(1);
		if (NewJobId == 0)
		{
			IdGenerator.fetch_add(1);
		}
		RefPtr<Job> NewJob(new Job());
		NewJob->Queue		   = this;
		NewJob->Name		   = Name;
		NewJob->Callback	   = std::move(JobFunc);
		NewJob->CancelFlag	   = false;
		NewJob->Id			   = JobId{.Id = NewJobId};
		NewJob->CreateTick	   = JobClock::Now();
		NewJob->StartTick	   = JobClock::Never();
		NewJob->EndTick		   = JobClock::Never();
		NewJob->WorkerThreadId = 0;

		ZEN_DEBUG("Scheduling background job {}:'{}'", NewJob->Id.Id, NewJob->Name);
		QueueLock.WithExclusiveLock([&]() { QueuedJobs.emplace_back(std::move(NewJob)); });
		WorkerCounter.AddCount(1);
		try
		{
			WorkerPool.ScheduleWork([&]() {
				auto _ = MakeGuard([&]() { WorkerCounter.CountDown(); });
				Worker();
			});
			return {.Id = NewJobId};
		}
		catch (const std::exception& Ex)
		{
			WorkerCounter.CountDown();
			QueueLock.WithExclusiveLock([&]() {
				if (auto It = std::find_if(QueuedJobs.begin(),
										   QueuedJobs.end(),
										   [NewJobId](const RefPtr<Job>& Job) { return Job->Id.Id == NewJobId; });
					It != QueuedJobs.end())
				{
					QueuedJobs.erase(It);
				}
			});
			ZEN_ERROR("Failed to schedule job {}:'{}' to job queue. Reason: ''", NewJob->Id.Id, NewJob->Name, Ex.what());
			throw;
		}
	}

	virtual bool CancelJob(JobId Id) override
	{
		bool Result = false;
		QueueLock.WithExclusiveLock([&]() {
			if (auto It = RunningJobs.find(Id.Id); It != RunningJobs.end())
			{
				ZEN_DEBUG("Cancelling running background job {}:'{}'", It->second->Id.Id, It->second->Name);
				It->second->CancelFlag.store(true);
				Result = true;
				return;
			}
			if (auto It = CompletedJobs.find(Id.Id); It != CompletedJobs.end())
			{
				Result = true;
				return;
			}
			if (auto It = AbortedJobs.find(Id.Id); It != AbortedJobs.end())
			{
				Result = true;
				return;
			}
			if (auto It = std::find_if(QueuedJobs.begin(), QueuedJobs.end(), [&Id](const RefPtr<Job>& Job) { return Job->Id.Id == Id.Id; });
				It != QueuedJobs.end())
			{
				ZEN_DEBUG("Cancelling queued background job {}:'{}'", (*It)->Id.Id, (*It)->Name);
				QueuedJobs.erase(It);
				Result = true;
				return;
			}
		});
		return Result;
	}

	virtual void Stop() override
	{
		ZEN_DEBUG("Stopping jobqueue");
		if (!InitializedFlag)
		{
			return;
		}
		InitializedFlag.store(false);
		QueueLock.WithExclusiveLock([&]() {
			for (auto& Job : RunningJobs)
			{
				Job.second->CancelFlag.store(true);
			}
			QueuedJobs.clear();
		});
		WorkerCounter.CountDown();
		while (true)
		{
			size_t RunningJobCount = 0;
			QueueLock.WithExclusiveLock([&]() {
				for (auto& Job : RunningJobs)
				{
					Job.second->CancelFlag.store(true);
					ZEN_INFO("Cancelling background job {}:'{}'", Job.second->Id.Id, Job.second->Name);
					RunningJobCount++;
				}
				QueuedJobs.clear();
			});
			if (RunningJobCount == 0)
			{
				WorkerCounter.Wait();
				break;
			}
			ptrdiff_t Remaining = WorkerCounter.Remaining();
			if (Remaining > 0)
			{
				ZEN_INFO("Waiting for {} background jobs to complete", Remaining);
				WorkerCounter.Wait(500);
			}
		}
	}

	virtual std::vector<JobInfo> GetJobs() override
	{
		std::vector<JobId> DeadJobs;
		auto			   IsStale = [](JobClock::Tick Time) {
			  ZEN_ASSERT_SLOW(Time != JobClock::Never());
			  const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now();
			  std::chrono::system_clock::duration		  Age = Now - JobClock::TimePointFromTick(Time);
			  return std::chrono::duration_cast<std::chrono::days>(Age) > std::chrono::days(1);
		};

		std::vector<JobInfo> Jobs;
		QueueLock.WithSharedLock([&]() {
			for (auto It : RunningJobs)
			{
				Jobs.push_back({.Id = JobId{It.first}, .Status = Status::Running});
			}
			for (auto It : CompletedJobs)
			{
				if (IsStale(It.second->EndTick))
				{
					DeadJobs.push_back(JobId{It.first});
					continue;
				}
				Jobs.push_back({.Id = JobId{It.first}, .Status = Status::Completed});
			}
			for (auto It : AbortedJobs)
			{
				if (IsStale(It.second->EndTick))
				{
					DeadJobs.push_back(JobId{It.first});
					continue;
				}
				Jobs.push_back({.Id = JobId{It.first}, .Status = Status::Aborted});
			}
			for (auto It : QueuedJobs)
			{
				Jobs.push_back({.Id = It->Id, .Status = Status::Queued});
			}
		});
		if (!DeadJobs.empty())
		{
			QueueLock.WithExclusiveLock([&]() {
				for (JobId Id : DeadJobs)
				{
					AbortedJobs.erase(Id.Id);
					CompletedJobs.erase(Id.Id);
				}
			});
		}
		return Jobs;
	}

	// Will only respond once when Complete is true
	virtual std::optional<JobDetails> Get(JobId Id) override
	{
		auto Convert = [](Status Status, Job& Job) -> JobDetails {
			return JobDetails{.Name			  = Job.Name,
							  .Status		  = Status,
							  .State		  = {.CurrentOp				   = Job.State.CurrentOp,
												 .CurrentOpPercentComplete = Job.State.CurrentOpPercentComplete,
												 .Messages				   = std::move(Job.State.Messages),
												 .AbortReason			   = Job.State.AbortReason},
							  .CreateTime	  = JobClock::TimePointFromTick(Job.CreateTick),
							  .StartTime	  = JobClock::TimePointFromTick(Job.StartTick),
							  .EndTime		  = JobClock::TimePointFromTick(Job.EndTick),
							  .WorkerThreadId = Job.WorkerThreadId};
		};

		std::optional<JobDetails> Result;
		QueueLock.WithExclusiveLock([&]() {
			if (auto It = RunningJobs.find(Id.Id); It != RunningJobs.end())
			{
				Result = Convert(Status::Running, *It->second);
				return;
			}
			if (auto It = CompletedJobs.find(Id.Id); It != CompletedJobs.end())
			{
				Result = Convert(Status::Completed, *It->second);
				CompletedJobs.erase(It);
				return;
			}
			if (auto It = AbortedJobs.find(Id.Id); It != AbortedJobs.end())
			{
				Result = Convert(Status::Aborted, *It->second);
				AbortedJobs.erase(It);
				return;
			}
			if (auto It = std::find_if(QueuedJobs.begin(), QueuedJobs.end(), [&Id](const RefPtr<Job>& Job) { return Job->Id.Id == Id.Id; });
				It != QueuedJobs.end())
			{
				Result = Convert(Status::Queued, *(*It));
				return;
			}
		});
		return Result;
	}

	void ReportMessage(JobId Id, std::string_view Message)
	{
		QueueLock.WithExclusiveLock([&]() {
			auto It = RunningJobs.find(Id.Id);
			ZEN_ASSERT(It != RunningJobs.end());
			It->second->State.Messages.push_back(std::string(Message));
		});
	}

	void ReportProgress(JobId Id, std::string_view CurrentOp, uint32_t CurrentOpPercentComplete)
	{
		QueueLock.WithExclusiveLock([&]() {
			auto It = RunningJobs.find(Id.Id);
			ZEN_ASSERT(It != RunningJobs.end());
			It->second->State.CurrentOp				   = CurrentOp;
			It->second->State.CurrentOpPercentComplete = CurrentOpPercentComplete;
		});
	}

	std::atomic_uint64_t IdGenerator = 1;

	std::atomic_bool						  InitializedFlag = false;
	RwLock									  QueueLock;
	std::deque<RefPtr<Job>>					  QueuedJobs;
	std::unordered_map<uint64_t, Job*>		  RunningJobs;
	std::unordered_map<uint64_t, RefPtr<Job>> CompletedJobs;
	std::unordered_map<uint64_t, RefPtr<Job>> AbortedJobs;

	WorkerThreadPool WorkerPool;
	Latch			 WorkerCounter;

	void Worker()
	{
		int			CurrentThreadId = GetCurrentThreadId();
		RefPtr<Job> CurrentJob;
		QueueLock.WithExclusiveLock([&]() {
			if (!QueuedJobs.empty())
			{
				CurrentJob = std::move(QueuedJobs.front());
				ZEN_ASSERT(CurrentJob);
				QueuedJobs.pop_front();
				RunningJobs.insert_or_assign(CurrentJob->Id.Id, CurrentJob);
				CurrentJob->StartTick	   = JobClock::Now();
				CurrentJob->WorkerThreadId = CurrentThreadId;
			}
		});
		if (!CurrentJob)
		{
			return;
		}

		try
		{
			SetCurrentThreadName(fmt::format("BkgJob: {}", CurrentJob->Name));
			ZEN_DEBUG("Executing background job {}:'{}'", CurrentJob->Id.Id, CurrentJob->Name);
			CurrentJob->Callback(*CurrentJob);
			ZEN_DEBUG("Completed background job {}:'{}'", CurrentJob->Id.Id, CurrentJob->Name);
			QueueLock.WithExclusiveLock([&]() {
				CurrentJob->EndTick		   = JobClock::Now();
				CurrentJob->WorkerThreadId = 0;
				RunningJobs.erase(CurrentJob->Id.Id);
				CompletedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob));
			});
		}
		catch (const AssertException& Ex)
		{
			ZEN_DEBUG("Background job {}:'{}' asserted. Reason: {}", CurrentJob->Id.Id, CurrentJob->Name, Ex.FullDescription());
			QueueLock.WithExclusiveLock([&]() {
				CurrentJob->State.AbortReason = Ex.FullDescription();
				CurrentJob->EndTick			  = JobClock::Now();
				CurrentJob->WorkerThreadId	  = 0;
				RunningJobs.erase(CurrentJob->Id.Id);
				AbortedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob));
			});
		}
		catch (const std::exception& Ex)
		{
			ZEN_DEBUG("Background job {}:'{}' aborted. Reason: '{}'", CurrentJob->Id.Id, CurrentJob->Name, Ex.what());
			QueueLock.WithExclusiveLock([&]() {
				CurrentJob->State.AbortReason = Ex.what();
				CurrentJob->EndTick			  = JobClock::Now();
				CurrentJob->WorkerThreadId	  = 0;
				RunningJobs.erase(CurrentJob->Id.Id);
				AbortedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob));
			});
		}
		SetCurrentThreadName(fmt::format("JobQueueImpl::Worker {}", GetCurrentThreadId()));
	}
};

std::string_view
JobQueue::ToString(Status Status)
{
	using namespace std::literals;

	switch (Status)
	{
		case JobQueue::Status::Queued:
			return "Queued"sv;
			break;
		case JobQueue::Status::Running:
			return "Running"sv;
			break;
		case JobQueue::Status::Aborted:
			return "Aborted"sv;
			break;
		case JobQueue::Status::Completed:
			return "Completed"sv;
			break;
		default:
			ZEN_ASSERT(false);
	}
	return ""sv;
}

std::unique_ptr<JobQueue>
MakeJobQueue(int WorkerCount, std::string_view QueueName)
{
	return std::make_unique<JobQueueImpl>(WorkerCount, QueueName);
}

//////////////////////////////////////////////////////////////////////////

#if ZEN_WITH_TESTS

void
jobqueue_forcelink()
{
}

TEST_CASE("JobQueue")
{
	std::unique_ptr<JobQueue> Queue(MakeJobQueue(2, "queue"));
	WorkerThreadPool		  Pool(4);
	Latch					  JobsLatch(1);
	for (uint32_t I = 0; I < 100; I++)
	{
		JobsLatch.AddCount(1);
		Pool.ScheduleWork([&Queue, &JobsLatch, I]() {
			auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); });
			JobsLatch.AddCount(1);
			auto Id = Queue->QueueJob(fmt::format("busy {}", I), [&JobsLatch, I](JobContext& Context) {
				auto $ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); });
				if (Context.IsCancelled())
				{
					return;
				}
				Context.ReportProgress("going to sleep", 0);
				Sleep(10);
				if (Context.IsCancelled())
				{
					return;
				}
				Context.ReportProgress("going to sleep again", 50);
				if ((I & 0xFF) == 0x10)
				{
					zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", I));
				}
				Sleep(10);
				if (Context.IsCancelled())
				{
					return;
				}
				Context.ReportProgress("done", 100);
			});
		});
	}

	auto Join = [](std::span<std::string> Strings, std::string_view Delimiter) -> std::string {
		ExtendableStringBuilder<128> SB;
		if (Strings.empty())
		{
			return {};
		}
		auto It = Strings.begin();
		SB.Append(*It);
		It++;
		while (It != Strings.end())
		{
			SB.Append(Delimiter);
			SB.Append(*It);
			It++;
		}
		return SB.ToString();
	};

	JobsLatch.CountDown();
	while (true)
	{
		bool						   PendingQueue = JobsLatch.Remaining() > 0;
		size_t						   PendingCount = 0;
		std::vector<JobId>			   RemainingJobs;
		std::vector<JobQueue::JobInfo> Statuses = Queue->GetJobs();
		RemainingJobs.reserve(Statuses.size());
		for (const auto& It : Statuses)
		{
			JobQueue::Status Status = It.Status;

			JobId								Id = It.Id;
			std::optional<JobQueue::JobDetails> CurrentState;
			if (Status != JobQueue::Status::Queued)
			{
				CurrentState = Queue->Get(Id);
				CHECK(CurrentState.has_value());
			}
			switch (Status)
			{
				case JobQueue::Status::Queued:
					PendingCount++;
					RemainingJobs.push_back(Id);
					break;
				case JobQueue::Status::Running:
					ZEN_DEBUG("{} running. '{}' {}% '{}'",
							  Id.Id,
							  CurrentState->State.CurrentOp,
							  CurrentState->State.CurrentOpPercentComplete,
							  Join(CurrentState->State.Messages, " "sv));
					RemainingJobs.push_back(Id);
					break;
				case JobQueue::Status::Aborted:
					ZEN_DEBUG("{} aborted. Reason: '{}'", Id.Id, CurrentState->State.AbortReason);
					break;
				case JobQueue::Status::Completed:
					ZEN_DEBUG("{} completed. '{}'", Id.Id, Join(CurrentState->State.Messages, " "sv));
					break;
				default:
					CHECK(false);
					break;
			}
		}
		if (RemainingJobs.empty() && !PendingQueue)
		{
			break;
		}
		ZEN_INFO("{} jobs active, {} pending in queue, {} running",
				 RemainingJobs.size(),
				 PendingCount,
				 RemainingJobs.size() - PendingCount);
		Sleep(100);
	}
	JobsLatch.Wait();
}
#endif

}  // namespace zen
